1 module dpq.connection;
2 
3 //import derelict.pq.pq;
4 import libpq.libpq;
5 
6 import dpq.exception;
7 import dpq.result;
8 import dpq.value;
9 import dpq.attributes;
10 import dpq.querybuilder;
11 import dpq.meta;
12 import dpq.prepared;
13 import dpq.smartptr;
14 import dpq.serialisation : isAnyNull;
15 
16 import std..string;
17 //import derelict.pq.pq;
18 import libpq.libpq;
19 import std.conv : to;
20 import std.traits;
21 import std.typecons;
22 
23 
24 version(unittest)
25 {
26 	import std.stdio;
27 	Connection c;
28 }
29 
30 /**
31 	Represents the PostgreSQL connection and allows executing queries on it.
32 
33 	Examples:
34 	-------------
35 	auto conn = Connection("host=localhost dbname=testdb user=testuser");
36 	//conn.exec ...
37 	-------------
38 */
39 struct Connection
40 {
41 	alias ConnectionPtr = SmartPointer!(PGconn*, PQfinish);
42 
43 	private ConnectionPtr _connection;
44 	private PreparedStatement[string] _prepared;
45 
46 	/**
47 		Connection constructor
48 
49 		Params:
50 			connString = connection string
51 
52 		See Also:
53 			http://www.postgresql.org/docs/9.3/static/libpq-connect.html#LIBPQ-CONNSTRING
54 	*/
55 	this(string connString)
56 	{
57 		char* err;
58 		auto opts = PQconninfoParse(cast(char*)connString.toStringz, &err);
59 
60 		if (err != null)
61 			throw new DPQException(err.fromStringz.to!string);
62 
63 		_connection = new ConnectionPtr(PQconnectdb(connString.toStringz));
64 
65 		if (status != CONNECTION_OK)
66 			throw new DPQException(errorMessage);
67 
68 		_dpqLastConnection = &this;
69 	}
70 
71 	unittest
72 	{
73 		c = Connection("host=127.0.0.1 dbname=test user=test");
74 		writeln(" * Database connection with connection string");
75 		assert(c.status == CONNECTION_OK);
76 	}
77 
78 	/** 
79 		Close the connection manually 
80 	*/
81 	void close()
82 	{
83 		_connection.clear();
84 	}
85 
86 	@property const(ConnStatusType) status()
87 	{
88 		return PQstatus(_connection);
89 	}
90 
91 	/** Returns the name of the database currently selected */
92 	@property const(string) db()
93 	{
94 		return PQdb(_connection).to!string;
95 	}
96 
97 	/** Returns the name of the current user */
98 	@property const(string) user()
99 	{
100 		return PQuser(_connection).to!string;
101 	}
102 
103 	/// ditto, but password
104 	@property const(string) password()
105 	{
106 		return PQpass(_connection).to!string;
107 	}
108 
109 	/// ditto, but host
110 	@property const(string) host()
111 	{
112 		return PQhost(_connection).to!string;
113 	}
114 
115 	/// ditto, but port
116 	@property const(ushort) port()
117 	{
118 		return PQport(_connection).fromStringz.to!ushort;
119 	}
120 
121 	/** // FIXME: BROKEN ATM
122 		Executes the given string directly
123 
124 		Throws on fatal query errors like bad syntax
125 
126 		Examples:
127 		----------------
128 		Connection conn; // An established connection
129 
130 		conn.exec("CREATE TABLE IF NOT EXISTS test_table");
131 		----------------
132 	*/
133 	Result exec(string command)
134 	{
135 		PGresult* res = PQexec(_connection, cast(const char*)command.toStringz);
136 		return Result(res);
137 	}
138 
139 	/*
140 	unittest
141 	{
142 		auto res = c.exec("SELECT 1::INT4 AS int4, 2::INT8 AS some_long");
143 		writeln(" * exec for selecting INT4 and INT8");
144 		assert(res.rows == 1);
145 		assert(res.columns == 2);
146 
147 		auto r = res[0];
148 		assert(r[0].as!int == 1);
149 		assert(r[0].as!long == 2);
150 
151 		writeln(" * Row opIndex(int) and opIndex(string) equality ");
152 		assert(r[0] == r["int4"]);
153 		assert(r[1] == r["some_long"]);
154 
155 	}
156 	*/
157 
158 	/// ditto, async
159 	bool send(string command)
160 	{
161 		return PQsendQuery(_connection, cast(const char*)command.toStringz) == 1;
162 	}
163 
164 	/**
165 		Executes the given string with given params
166 
167 		Params should be given as $1, $2, ... $n in the actual command.
168 		All params are sent in a binary format and should not be escaped.
169 		If a param's type cannot be inferred, this method will throw an exception,
170 		in this case, either specify the type using the :: (cast) notation or
171 		make sure the type can be inferred by PostgreSQL in your query.
172 
173 		Examples:
174 		----------------
175 		Connection conn; // An established connection
176 
177 		conn.execParams("SELECT $1::string, $2::int, $3::double");
178 		----------------
179 
180 		See also:
181 			http://www.postgresql.org/docs/9.3/static/libpq-exec.html
182 	*/
183 	Result execParams(T...)(string command, T params)
184 	{
185 		Value[] values;
186 		foreach(param; params)
187 			values ~= Value(param);
188 
189 		return execParams(command, values);
190 	}
191 
192 
193 	void sendParams(T...)(string command, T params)
194 	{
195 		Value[] values;
196 		foreach(param; params)
197 			values ~= Value(param);
198 
199 		execParams(command, values, true);
200 	}
201 
202 	unittest
203 	{
204 		auto res = c.execParams("SELECT 1::INT4 AS int4, 2::INT8 AS some_long", []);
205 		writeln("\t * execParams");
206 		writeln("\t\t * Rows and cols");
207 
208 		assert(res.rows == 1);
209 		assert(res.columns == 2);
210 
211 		writeln("\t\t * Static values");
212 		auto r = res[0];
213 		assert(r[0].as!int == 1);
214 		assert(r[1].as!long == 2);
215 
216 		writeln("\t\t * opIndex(int) and opIndex(string) equality");
217 		assert(r[0] == r["int4"]);
218 		assert(r[1] == r["some_long"]);
219 
220 		int int4 = 1;
221 		long int8 = 2;
222 		string str = "foo bar baz";
223 		float float4 = 3.14;
224 		double float8 = 3.1415;
225 
226 		writeln("\t\t * Passed values");
227 		res = c.execParams(
228 				"SELECT $1::INT4, $2::INT8, $3::TEXT, $4::FLOAT4, $5::FLOAT8",
229 				int4,
230 				int8,
231 				str,
232 				float4,
233 				float8);
234 
235 		assert(res.rows == 1);
236 		r = res[0];
237 
238 		assert(r[0].as!int == int4);
239 		assert(r[1].as!long == int8);
240 		assert(r[2].as!string == str);
241 		assert(r[3].as!float == float4);
242 		assert(r[4].as!double == float8);
243 	}
244 
245 	/// ditto, but taking an array of params, instead of variadic template
246 	Result execParams(string command, Value[] params, bool async = false)
247 	{
248 		const char* cStr = cast(const char*) command.toStringz;
249 
250 		auto pTypes = params.paramTypes;
251 		auto pValues = params.paramValues;
252 		auto pLengths = params.paramLengths;
253 		auto pFormats = params.paramFormats;
254 
255 		if (async)
256 		{
257 			PQsendQueryParams(
258 				_connection,
259 				cStr,
260 				params.length.to!int,
261 				pTypes.ptr,
262 				cast(const(char*)*)pValues.ptr,
263 				pLengths.ptr,
264 				pFormats.ptr,
265 				1);
266 
267 			return Result(null);
268 		}
269 		else
270 			return Result(PQexecParams(
271 					_connection, 
272 					cStr, 
273 					params.length.to!int,
274 					pTypes.ptr, 
275 					cast(const(char*)*)pValues.ptr,
276 					pLengths.ptr,
277 					pFormats.ptr,
278 					1));
279 	}
280 
281 	/// ditto, async
282 	void sendParams(string command, Value[] params)
283 	{
284 		execParams(command, params, true);
285 	}
286 
287 	/**
288 		Returns the last error message
289 
290 		Examples:
291 		--------------------
292 		Connection conn; // An established connection
293 
294 		writeln(conn.errorMessage);
295 		--------------------
296 		
297 	 */
298 	@property string errorMessage()
299 	{
300 		return PQerrorMessage(_connection).to!string;
301 	}
302 
303 	unittest
304 	{
305 		writeln("\t * errorMessage");
306 		try
307 		{
308 			c.execParams("SELECT_BADSYNTAX $1::INT4", 1);
309 		}
310 		catch {}
311 
312 		assert(c.errorMessage.length != 0);
313 
314 	}
315 
316 	/**
317 		Generates and runs the DDL from the given structures
318 
319 		Attributes from dpq.attributes should be used to define
320 		primary keys, indexes, and relationships.
321 
322 		A custom type can be specified with the @type attribute.
323 
324 		Examples:
325 		-----------------------
326 		Connection conn; // An established connection
327 		struct User 
328 		{
329 			@serial8 @PKey long id;
330 			string username;
331 			byte[] passwordHash;
332 		};
333 
334 		struct Article { ... };
335 
336 		conn.ensureSchema!(User, Article);
337 		-----------------------
338 	*/
339 	void ensureSchema(T...)(bool createType = false)
340 	{
341 		import std.stdio;
342 		string[] additional;
343 		string[] relations;
344 
345 		foreach (type; T)
346 		{
347 			enum name = relationName!(type);
348 			relations ~= name;
349 
350 			string str;
351 			if (createType)
352 				str = "CREATE TYPE \"" ~ name ~ "\" AS (%s)";
353 			else
354 				str = "CREATE TABLE IF NOT EXISTS \"" ~ name ~ "\" (%s)";
355 
356 			string cols;
357 			foreach(m; serialisableMembers!type)
358 			{
359 				string colName = attributeName!(mixin("type." ~ m));
360 				cols ~= "\"" ~ colName ~ "\"";
361 
362 				// HACK: typeof a @property seems to be failing hard
363 				static if (is(FunctionTypeOf!(mixin("type." ~ m)) == function))
364 					alias t = typeof(mixin("type()." ~ m));
365 				else
366 					alias t = typeof(mixin("type." ~ m));
367 
368 				cols ~= " ";
369 
370 				// Basic data types
371 				static if (hasUDA!(mixin("type." ~ m), PGTypeAttribute))
372 					cols ~= getUDAs!(mixin("type." ~ m), PGTypeAttribute)[0].type;
373 				else
374 				{
375 					alias tu = Unqual!t;
376 					static if (ShouldRecurse!(mixin("type." ~ m)))
377 					{
378 						ensureSchema!(NoNullable!tu)(true);
379 						cols ~= '"' ~ relationName!(NoNullable!tu) ~ '"';
380 					}
381 					else
382 						cols ~= SQLType!tu;
383 				}
384 				
385 				// Primary key
386 				static if (hasUDA!(mixin("type." ~ m), PrimaryKeyAttribute))
387 				{
388 					if (!createType)
389 						cols ~= " PRIMARY KEY";
390 				}
391 				// Index
392 				else static if (hasUDA!(mixin("type." ~ m), IndexAttribute))
393 				{
394 					enum uda = getUDAs!(mixin("type." ~ m), IndexAttribute)[0];
395 					additional ~= "CREATE%sINDEX \"%s\" ON \"%s\" (\"%s\")".format(
396 							uda.unique ? " UNIQUE " : " ",
397 							"%s_%s_index".format(name, colName),
398 							name,
399 							colName);
400 
401 					// DEBUG
402 				}
403 				// Foreign key
404 				else static if (hasUDA!(mixin("type." ~ m), ForeignKeyAttribute))
405 				{
406 					enum uda = getUDAs!(mixin("type." ~ m), ForeignKeyAttribute)[0];
407 					additional ~= 
408 						"ALTER TABLE \"%s\" ADD CONSTRAINT \"%s\" FOREIGN KEY (\"%s\") REFERENCES \"%s\" (\"%s\")".format(
409 								name,
410 								"%s_%s_fk_%s".format(name, colName, uda.relation),
411 								colName,
412 								uda.relation,
413 								uda.pkey);
414 
415 					// Create an index on the FK too
416 					additional ~= "CREATE INDEX \"%s\" ON \"%s\" (\"%s\")".format(
417 							"%s_%s_fk_index".format(name, colName),
418 							name,
419 							colName);
420 
421 				}
422 
423 				cols ~= ", ";
424 			}
425 
426 			cols = cols[0 .. $ - 2];
427 			str = str.format(cols);
428 			if (createType)
429 			{
430 				try
431 				{
432 					exec(str);
433 				}
434 				catch {} // Do nothing, type already exists
435 			}
436 			else
437 				exec(str);
438 		}
439 		foreach (cmd; additional)
440 		{
441 			try
442 			{
443 				exec(cmd);
444 			}
445 			catch {} // Horrible, I know, but this just means the constraint/index already exists
446 		}
447 
448 		// After we're done creating all the types, a good idea would be to get OIDs for all of them
449 		if (relations.length > 0)
450 		{
451 			auto res = execParams(
452 					"SELECT typname, oid FROM pg_type WHERE typname IN ('%s')".format(
453 						relations.join("','")));
454 
455 			foreach (r; res)
456 				_dpqCustomOIDs[r[0].as!string] = r[1].as!Oid;
457 		}
458 
459 		
460 	}
461 
462 	unittest
463 	{
464 		// Probably needs more thorough testing, let's assume right now
465 		// everything is correct if the creating was successful.
466 
467 		writeln("\t * ensureSchema");
468 		struct Inner
469 		{
470 			string innerStr;
471 			int innerInt;
472 		}
473 
474 		struct TestTable1
475 		{
476 			@serial8 @PK long id;
477 			string str;
478 			int n;
479 			@embed Inner inner;
480 		}
481 
482 		c.ensureSchema!TestTable1;
483 		
484 		auto res = c.execParams(
485 				"SELECT COUNT(*) FROM pg_catalog.pg_tables WHERE tablename = $1",
486 				relationName!TestTable1);
487 
488 		assert(res.rows == 1);
489 		assert(res[0][0].as!long == 1);
490 
491 		c.exec("DROP TABLE " ~ relationName!TestTable1);
492 		c.exec("DROP TYPE \"" ~ relationName!Inner ~ "\" CASCADE");
493 	}
494 
495 	/**
496 		Returns the requested structure or a Nullable null value if no rows are returned
497 
498 		This method queries for the given structure by its primary key. If no
499 		primary key can be found, a compile-time error will be generated.
500 
501 		Examples:
502 		----------------------
503 		Connection conn; // An established connection
504 		struct User
505 		{
506 			@serial @PKey int id;
507 			...
508 		};
509 
510 		auto user = conn.findOne!User(1); // will search by the id attribute
511 		----------------------
512 	*/
513 	Nullable!T findOne(T, U)(U id)
514 	{
515 		return findOneBy!T(primaryKeyAttributeName!T, id);
516 	}
517 
518 	unittest
519 	{
520 		writeln("\t * findOne(T)(U id), findOneBy, findOne");
521 		struct Testy
522 		{
523 			@serial @PK int id;
524 			string foo;
525 			int bar;
526 			long baz;
527 			int[] intArr;
528 			//string[] stringArr; // TODO: string[]
529 		}
530 
531 		c.ensureSchema!Testy;
532 
533 		writeln("\t\t * Null result");
534 		auto shouldBeNull = c.findOne!Testy(0);
535 		assert(shouldBeNull.isNull);
536 
537 		c.exec("INSERT INTO " ~ relationName!Testy ~ " (id, foo, bar, baz, " ~ attributeName!(Testy.intArr) ~ ") "~
538 				"VALUES (1, 'somestr', 2, 3, '{1,2,3}')");
539 
540 		writeln("\t\t * Valid result");
541 		Testy t = c.findOne!Testy(1);
542 		assert(t.id == 1, `t.id == 1` );
543 		assert(t.foo == "somestr", `t.foo == "somestr"`);
544 		assert(t.bar == 2, `t.bar == 2`);
545 		assert(t.baz == 3, `t.baz == 3`);
546 		assert(t.intArr == [1,2,3], `t.intArr == [1,2,3]`);
547 		//assert(t.stringArr == ["asd", "qwe"]);
548 
549 		writeln("\t\t * findOne with custom filter");
550 		Testy t2 = c.findOne!Testy("id = $1", 1);
551 		assert(t == t2);
552 
553 		c.exec("DROP TABLE " ~ relationName!Testy);
554 	}
555 
556 	/**
557 		Returns the requestes structure, searches by the given column name
558 		with the given value
559 		If not rows are returned, a Nullable null value is returned
560 
561 		Examples:
562 		----------------------
563 		Connection conn; // An established connection
564 		struct User
565 		{
566 			@serial @PKey int id;
567 			...
568 		};
569 
570 		auto user = conn.findOneBy!User("id", 1); // will search by "id"
571 		----------------------
572 	*/
573 	Nullable!T findOneBy(T, U)(string col, U val)
574 	{
575 		import std.stdio;
576 
577 		auto members = AttributeList!T;
578 
579 		QueryBuilder qb;
580 		qb.select(members)
581 			.from(relationName!T)
582 			.where( col ~ " = {col_" ~ col ~ "}")
583 			.limit(1);
584 
585 		qb["col_" ~ col] = val;
586 
587 		auto q = qb.query(this);
588 
589 		auto r = q.run();
590 		if (r.rows == 0)
591 			return Nullable!T.init;
592 
593 		//return T();
594 		
595 		auto res = deserialise!T(r[0]);
596 		return Nullable!T(res);
597 	}
598 	
599 	/**
600 		Returns the requested structure, searches by the specified filter
601 		with given params
602 
603 		The filter is not further escaped, so programmer needs to make sure
604 		not to properly escape or enclose reserved keywords (like user -> "user")
605 		so PostgreSQL can understand them.
606 
607 		If not rows are returned, a Nullable null value is returned
608 
609 		Examples:
610 		----------------------
611 		Connection conn; // An established connection
612 		struct User
613 		{
614 			@serial @PKey int id;
615 			string username;
616 			int posts;
617 		};
618 
619 		auto user = conn.findOne!User("username = $1 OR posts > $2", "foo", 42);
620 		if (!user.isNull)
621 		{
622 			... // do something
623 		}
624 		----------------------
625 	*/
626 	Nullable!T findOne(T, U...)(string filter, U vals)
627 	{
628 		QueryBuilder qb;
629 		qb.select(AttributeList!T)
630 			.from(relationName!T)
631 			.where(filter)
632 			.limit(1);
633 
634 		auto q = qb.query(this);
635 		auto r = q.run(vals);
636 
637 		if (r.rows == 0)
638 			return Nullable!T.init;
639 
640 		auto res = deserialise!T(r[0]);
641 		return Nullable!T(res);
642 	}
643 
644 	/**
645 		Returns an array of the specified type, filtered with the given filter and
646 		params
647 
648 		If no rows are returned by PostgreSQL, an empty array is returned.
649 
650 		Examples:
651 		----------------------
652 		Connection conn; // An established connection
653 		struct User
654 		{
655 			@serial @PKey int id;
656 			string username;
657 			int posts;
658 		};
659 
660 		auto users = conn.find!User("username = $1 OR posts > $2", "foo", 42);
661 		foreach (u; users)
662 		{
663 			... // do something
664 		}
665 		----------------------
666 	*/
667 	T[] find(T, U...)(string filter = "", U vals = U.init)
668 	{
669 		QueryBuilder qb;
670 		qb.select(AttributeList!T)
671 			.from(relationName!T)
672 			.where(filter);
673 
674 		auto q = qb.query(this);
675 
676 		T[] res;
677 		foreach (r; q.run(vals))
678 			res ~= deserialise!T(r);
679 
680 		return res;
681 	}
682 
683 	unittest
684 	{
685 		writeln("\t * find");
686 
687 		@relation("find_test")
688 		struct Test
689 		{
690 			@serial @PK int id;
691 			@attr("my_n") int n;
692 		}
693 
694 		c.ensureSchema!Test;
695 
696 		Test t;
697 		t.n = 1;
698 
699 		c.insert(t);
700 		c.insert(t);
701 		++t.n;
702 		c.insert(t);
703 		c.insert(t);
704 		c.insert(t);
705 
706 		Test[] ts = c.find!Test("my_n = $1", 1);
707 		assert(ts.length == 2);
708 		ts = c.find!Test("my_n > 0");
709 		assert(ts.length == 5);
710 		ts = c.find!Test("false");
711 		assert(ts.length == 0);
712 
713 		c.exec("DROP TABLE find_test");
714 	}
715 
716 	int update(T, U...)(string filter, string update, U vals)
717 	{
718 		QueryBuilder qb;
719 		qb.update(relationName!T)
720 			.set(update)
721 			.where(filter);
722 
723 		auto r = qb.query(this).run(vals);
724 		return r.rows;
725 	}
726 
727 	unittest
728 	{
729 		writeln("\t * update");
730 
731 		@relation("update_test")
732 		struct Test
733 		{
734 			@serial @PK int id;
735 			int n;
736 		}
737 
738 		c.ensureSchema!Test;
739 
740 		Test t;
741 		t.n = 5;
742 		c.insert(t);
743 
744 		int nUpdates = c.update!Test("n = $1", "n = $2", 5, 123);
745 		assert(nUpdates == 1, `nUpdates == 1`);
746 
747 		t = c.findOneBy!Test("n", 123);
748 		assert(t.n == 123, `t.n == 123`);
749 
750 		writeln("\t\t * async");
751 		c.updateAsync!Test("n = $1", "n = $2", 123, 6);
752 		auto r = c.nextResult();
753 
754 		assert(r.rows == 1);
755 		assert(!c.findOneBy!Test("n", 6).isNull);
756 
757 		c.exec("DROP TABLE update_test");
758 	}
759 
760 	void updateAsync(T, U...)(string filter, string update, U vals)
761 	{
762 		QueryBuilder qb;
763 		qb.update(relationName!T)
764 			.set(update)
765 			.where(filter);
766 
767 		qb.query(this).runAsync(vals);
768 	}
769 
770 	int update(T, U)(U id, Value[string] updates, bool async = false)
771 	{
772 		QueryBuilder qb;
773 
774 		qb.update(relationName!T)
775 			.set(updates)
776 			.where(primaryKeyAttributeName!T, id);
777 
778 		auto q = qb.query(this);
779 
780 		if (async)
781 		{
782 			q.runAsync();
783 			return -1;
784 		}
785 
786 		auto r = q.run();
787 		return r.rows;
788 	}
789 
790 	unittest
791 	{
792 		writeln("\t * update with AA updates");
793 
794 		@relation("update_aa_test")
795 		struct Test
796 		{
797 			@serial @PK int id;
798 			int n;
799 		}
800 		c.ensureSchema!Test;
801 
802 		Test t;
803 		t.n = 1;
804 		c.insert(t);
805 
806 		int rows = c.update!Test(1, ["n": Value(2)]);
807 		assert(rows == 1, `r.rows == 1`);
808 
809 		c.exec("DROP TABLE update_aa_test");
810 	}
811 
812 	void updateAsync(T, U)(U id, Value[string] updates)
813 	{
814 		update!T(id, updates, true);
815 	}
816 
817 	int update(T, U)(U id, T updates, bool async = false)
818 	{
819 		import dpq.attributes;
820 
821 		QueryBuilder qb;
822 
823 		qb.update(relationName!T)
824 			.where(primaryKeyAttributeName!T, id);
825 
826 		foreach (m; serialisableMembers!T)
827 			qb.set(attributeName!(mixin("T." ~ m)), __traits(getMember, updates, m));
828 
829 		auto q = qb.query(this);
830 		if (async)
831 		{
832 			qb.query(this).runAsync();
833 			return -1;
834 		}
835 
836 		auto r = q.run();
837 		return r.rows;
838 	}
839 
840 	unittest
841 	{
842 		writeln("\t * update with object");
843 		
844 		@relation("update_object_test")
845 		struct Test
846 		{
847 			@serial @PK int id;
848 			int n;
849 		}
850 		c.ensureSchema!Test;
851 
852 		Test t;
853 		t.n = 1;
854 		t.id = 1; // assumptions <3
855 
856 		c.insert(t);
857 
858 		t.n = 2;
859 		c.update!Test(1, t);
860 		
861 		t = c.findOne!Test(1);
862 		assert(t.n == 2);
863 
864 		t.n = 3;
865 		c.updateAsync!Test(1, t);
866 		auto r = c.nextResult();
867 
868 		writeln("\t\t * async");
869 		assert(r.rows == 1);
870 
871 		c.exec("DROP TABLE update_object_test");
872 	}
873 
874 	void updateAsync(T, U)(U id, T updates)
875 	{
876 		update!T(id, updates, true);
877 	}
878 
879 
880 	private void addVals(T, U)(ref QueryBuilder qb, U val)
881 	{
882 		if (isAnyNull(val))
883 			qb.addValue(null);
884 		else
885 		{
886 			foreach (m; serialisableMembers!(NoNullable!T))
887 			{
888 				static if (isPK!(T, m) || hasUDA!(mixin("T." ~ m), IgnoreAttribute))
889 					continue;
890 				else
891 					qb.addValue(__traits(getMember, val, m));
892 			}
893 		}
894 	}
895 
896 	Result insertR(T)(T val, string ret = "")
897 	{
898 		QueryBuilder qb;
899 		qb.insert(relationName!T, AttributeList!(T, true, true));
900 		if (ret.length > 0)
901 			qb.returning(ret);
902 
903 		addVals!T(qb, val);
904 
905 		return qb.query(this).run();
906 	}
907 
908 	bool insert(T)(T val, bool async = false)
909 	{
910 		QueryBuilder qb;
911 		qb.insert(relationName!T, AttributeList!(T, true, true));
912 
913 		addVals!T(qb, val);
914 
915 		if (async)
916 			return qb.query(this).runAsync();
917 
918 		auto r = qb.query(this).run();
919 		return r.rows > 0;
920 	}
921 
922 	unittest
923 	{
924 		writeln("\t * insert");
925 
926 		@relation("insert_test_inner")
927 		struct Inner
928 		{
929 			int bar;
930 		}
931 
932 		@relation("insert_test")
933 		struct Test
934 		{
935 			int n;
936 			Nullable!int n2;
937 			@embed Inner foo;
938 		}
939 		c.ensureSchema!Test;
940 
941 		Test t;
942 		t.n = 1;
943 		t.n2 = 2;
944 		t.foo.bar = 2;
945 		
946 		auto r = c.insert(t);
947 		assert(r == true);
948 
949 		auto r2 = c.insertR(t, "n");
950 		assert(r2.rows == 1);
951 		assert(r2[0][0].as!int == t.n);
952 
953 		Test t2 = c.findOneBy!Test("n", 1);
954 		assert(t2 == t, t.to!string ~ " != " ~ t2.to!string);
955 
956 		writeln("\t\t * async");
957 		t.n = 123;
958 		t.n2.nullify;
959 		c.insertAsync(t);
960 		
961 		auto res = c.nextResult();
962 		assert(res.rows == 1);
963 		t2 = c.findOneBy!Test("n", 123);
964 		assert(t.n2.isNull);
965 		assert(t2.n2.isNull);
966 
967 		c.exec("DROP TABLE insert_test");
968 		c.exec("DROP TYPE \"%s\" CASCADE".format(relationName!Inner));
969 	}
970 
971 	void insertAsync(T)(T val)
972 	{
973 		insert(val, true);
974 	}
975 
976 	int remove(T, U)(U id)
977 	{
978 		QueryBuilder qb;
979 		qb.remove!T
980 			.where(primaryKeyAttributeName!T, id);
981 
982 		return qb.query(this).run().rows;
983 	}
984 
985 	bool removeAsync(T, U)(U id)
986 	{
987 		QueryBuilder qb;
988 		qb.remove!T
989 			.where(primaryKeyAttributeName!T, id);
990 
991 		return qb.query(this).runAsync() == 1;
992 	}
993 
994 
995 	int remove(T, U...)(string filter, U vals)
996 	{
997 		QueryBuilder qb;
998 		qb.remove!T
999 			.where(filter);
1000 
1001 		foreach (v; vals)
1002 			qb.addValue(v);
1003 
1004 		return qb.query(this).run().rows;
1005 	}
1006 
1007 	bool removeAsync(T, U...)(string filter, U vals)
1008 	{
1009 		QueryBuilder qb;
1010 		qb.remove!T
1011 			.where(filter);
1012 
1013 		foreach (v; vals)
1014 			qb.addValue(v);
1015 
1016 		return qb.query(this).runAsync() == 1;
1017 	}
1018 
1019 	unittest
1020 	{
1021 		@relation("remove_test")
1022 		struct Test
1023 		{
1024 			@serial @PK int id;
1025 			int n;
1026 		}
1027 		c.ensureSchema!Test;
1028 
1029 		foreach (i; 0 .. 10)
1030 			c.insert(Test(0, i));
1031 
1032 		writeln("\t * remove(id)");
1033 		int n = c.remove!Test(1);
1034 		assert(n == 1, `n == 1`);
1035 
1036 
1037 		writeln("\t\t * async");
1038 		c.removeAsync!Test(2);
1039 		auto r = c.nextResult();
1040 		assert(r.rows == 1, `r.rows == 1`);
1041 
1042 		writeln("\t * remove(filter, vals...)");
1043 		n = c.remove!Test("id IN($1,$2,$3,$4,$5)", 3, 4, 5, 6, 7);
1044 		assert(n == 5);
1045 
1046 		writeln("\t\t * async");
1047 		c.removeAsync!Test("id >= $1", 7);
1048 		r = c.nextResult();
1049 		assert(r.rows == 3);
1050 
1051 		c.exec("DROP TABLE remove_test");
1052 	}
1053 
1054 	long count(T, U...)(string filter = "", U vals = U.init)
1055 	{
1056 		import dpq.query;
1057 		auto q = Query(this);
1058 		string str = `SELECT COUNT(*) FROM "%s"`.format(relationName!T);
1059 
1060 		if (filter.length > 0)
1061 			str ~= " WHERE " ~ filter;
1062 
1063 		q = str;
1064 		auto r = q.run(vals);
1065 
1066 		return r[0][0].as!long;
1067 	}
1068 
1069 	unittest
1070 	{
1071 		writeln("\t * count");
1072 
1073 		@relation("test_count")
1074 		struct Test
1075 		{
1076 			@serial @PK int id;
1077 			int n;
1078 		}
1079 
1080 		c.ensureSchema!Test;
1081 
1082 		Test t;
1083 		c.insert(t);
1084 
1085 		assert(c.count!Test == 1, `count == 1`);
1086 		c.insert(t);
1087 		assert(c.count!Test == 2, `count == 2`);
1088 
1089 		c.exec("DROP TABLE test_count");
1090 	}
1091 
1092 	bool isBusy()
1093 	{
1094 		return PQisBusy(_connection) == 1;
1095 	}
1096 
1097 	unittest
1098 	{
1099 		writeln("\t * isBusy");
1100 
1101 		assert(c.isBusy() == false);
1102 
1103 		c.send("SELECT 1::INT");
1104 
1105 		assert(c.isBusy() == true);
1106 
1107 		c.nextResult();
1108 		assert(c.isBusy() == false);
1109 	}
1110 
1111 
1112 	/**
1113 		 Blocks until a result is read, then returns it
1114 
1115 		 If no more results remain, a null result will be returned
1116 
1117 		 Make sure to call this until a null is returned.
1118 	*/
1119 	Result nextResult()
1120 	{
1121 		import core.thread;
1122 
1123 		/*
1124 		do
1125 		{
1126 			PQconsumeInput(_connection);
1127 			//Thread.sleep(dur!"msecs"(1)); // What's a reasonable amount of time to wait?
1128 		}
1129 		while (isBusy());
1130 		*/
1131 
1132 		PGresult* res = PQgetResult(_connection);
1133 		return Result(res);
1134 	}
1135 
1136 	Result[] allResults()
1137 	{
1138 		Result[] res;
1139 		
1140 		PGresult* r;
1141 		while ((r = PQgetResult(_connection)) != null)
1142 			res ~= Result(r);
1143 
1144 		return res;
1145 	}
1146 	
1147 	Result lastResult()
1148 	{
1149 		Result res;
1150 
1151 		PGresult* r;
1152 		while ((r = PQgetResult(_connection)) != null)
1153 			res = Result(r);
1154 
1155 		return res;
1156 	}
1157 
1158 	unittest
1159 	{
1160 		writeln("\t * nextResult");
1161 		auto x = c.nextResult();
1162 		assert(x.isNull);
1163 
1164 		int int1 = 1;
1165 		int int2 = 2;
1166 
1167 		c.sendParams("SELECT $1", int1);
1168 
1169 		// In every way the same as lastResult
1170 		Result r, t;
1171 		while(!(t = c.nextResult()).isNull)
1172 			r = t;
1173 
1174 		assert(r.rows == 1);
1175 		assert(r.columns == 1);
1176 		assert(r[0][0].as!int == int1);
1177 
1178 		writeln("\t * lastResult");
1179 		c.sendParams("SELECT $1", int2);
1180 		r = c.lastResult();
1181 
1182 		assert(r.rows == 1);
1183 		assert(r.columns == 1);
1184 		assert(r[0][0].as!int == int2);
1185 	}
1186 
1187 	Result prepare(T...)(string name, string command, T paramTypes)
1188 	{
1189 		Oid[] oids;
1190 		foreach (pType; paramTypes)
1191 			oids ~= pType;
1192 
1193 		char* cName = cast(char*) name.toStringz;
1194 		char* cComm = cast(char*) command.toStringz;
1195 
1196 		auto p = PreparedStatement(this, name, command, oids);
1197 		_prepared[name] = p;
1198 
1199 		return Result(PQprepare(
1200 					_connection,
1201 					cName,
1202 					cComm,
1203 					oids.length.to!int,
1204 					oids.ptr));
1205 	}
1206 
1207 	Result execPrepared(string name, Value[] params...)
1208 	{
1209 		char* cStr = cast(char*) name.toStringz;
1210 
1211 		return Result(PQexecPrepared(
1212 					_connection,
1213 					cStr,
1214 					params.length.to!int,
1215 					cast(char**) params.paramValues.ptr,
1216 					params.paramLengths.ptr,
1217 					params.paramFormats.ptr,
1218 					1));
1219 	}
1220 
1221 	Result execPrepared(T...)(string name, T params)
1222 	{
1223 		Value[] vals;
1224 		foreach (p; params)
1225 			vals ~= Value(p);
1226 
1227 		return execPrepared(name, vals);
1228 	}
1229 
1230 	bool sendPrepared(string name, Value[] params...)
1231 	{
1232 		char* cStr = cast(char*) name.toStringz;
1233 
1234 		return PQsendQueryPrepared(
1235 				_connection,
1236 				cStr,
1237 				params.length.to!int,
1238 				cast(char**) params.paramValues.ptr,
1239 				params.paramLengths.ptr,
1240 				params.paramFormats.ptr,
1241 				1) == 1;
1242 	}
1243 
1244 	bool sendPrepared(T...)(string name, T params)
1245 	{
1246 		Value[] vals;
1247 		foreach (p; params)
1248 			vals ~= Value(p);
1249 
1250 		return sendPrepared(name, vals);
1251 	}
1252 
1253 	unittest
1254 	{
1255 		writeln("\t * prepare");
1256 		// The result of this isn't really all that useful, but as long as it 
1257 		// throws on errors, it kinda is
1258 		c.prepare("prepare_test", "SELECT $1", Type.INT4);
1259 
1260 		writeln("\t * execPrepared");
1261 		auto r = c.execPrepared("prepare_test", 1);
1262 		assert(r.rows == 1);
1263 		assert(r[0][0].as!int == 1);
1264 
1265 		writeln("\t\t * sendPrepared");
1266 		bool s = c.sendPrepared("prepare_test", 1);
1267 		assert(s);
1268 
1269 		r = c.lastResult();
1270 		assert(r.rows == 1);
1271 		assert(r[0][0].as!int == 1);
1272 	}
1273 	
1274 	ref PreparedStatement prepared(string name)
1275 	{
1276 		return _prepared[name];
1277 	}
1278 
1279 	ref PreparedStatement opIndex(string name)
1280 	{
1281 		return prepared(name);
1282 	}
1283 }
1284 
1285 
1286 /**
1287 	Deserialises the given Row to the requested type
1288 
1289 	Params:
1290 		T  = (template) type to deserialise into
1291 		r  = Row to deserialise
1292 */
1293 T deserialise(T)(Row r, string prefix = "")
1294 {
1295 	T res;
1296 	foreach (m; serialisableMembers!T)
1297 	{
1298 		enum n = attributeName!(mixin("T." ~ m));
1299 		alias mType = Unqual!(typeof(mixin("T." ~ m)));
1300 
1301 		try
1302 		{
1303 			auto x = r[prefix ~ n].as!mType;
1304 			if (!x.isNull)
1305 				__traits(getMember, res, m) = cast(TypedefType!(mType)) x;
1306 		}
1307 		catch (DPQException e) 
1308 		{
1309 			if (!isInstanceOf!(Nullable, mType))
1310 				throw e;
1311 		}
1312 	}
1313 	return res;
1314 }
1315 
1316 /// Hold the last created connection, not to be used outside the library
1317 package Connection* _dpqLastConnection;
1318 
1319 /// Holds a list of all the OIDs of our custom types, indexed by their relationName
1320 package Oid[string] _dpqCustomOIDs;
1321